UNPKG

4.37 kBPlain TextView Raw
1import { connect, Message } from 'amqplib';
2
3import { Source, SourceCallback, SourceOptions } from '@neoskop/paperboy';
4
5import { fetchDamAssets } from './dam.util';
6import { MagnoliaSourceOptions } from './magnolia-source-options.interface';
7import {
8 fetchPages, fetchSitemap, fetchWorkspace, sanitizeJson, writePagesFile, writeWorkspaceFile
9} from './pages.util';
10
11import AsyncLock = require('async-lock');
12
13export class MagnoliaSource implements Source {
14 private readonly options: MagnoliaSourceOptions;
15 private readonly callback: SourceCallback;
16 private readonly generationLock: AsyncLock = new AsyncLock({ maxPending: 1 });
17
18 constructor(options: SourceOptions, callback: SourceCallback) {
19 this.options = <MagnoliaSourceOptions>options;
20 this.callback = callback;
21 }
22
23 public generate(): Promise<void> {
24 return new Promise(async resolve => {
25 const sitemap = await fetchSitemap(this.options);
26 const website = await fetchPages(this.options);
27 const pages = sitemap
28 .map(path => website.find((page: any) => page['@path'] === path))
29 .filter(page => typeof page !== 'undefined');
30
31 const workspaces: { [workspace: string]: any } = {};
32
33 if (this.options.magnolia.workspaces) {
34 for (const workspace of this.options.magnolia.workspaces) {
35 workspaces[workspace] = await fetchWorkspace(workspace, this.options);
36 }
37 }
38
39 // get dam jcr ids
40 const nodes = pages.concat(
41 Object.keys(workspaces).reduce((prev, current) => prev.concat(workspaces[current]), [])
42 );
43 const match = JSON.stringify(nodes).match(
44 /jcr:([0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})/g
45 );
46
47 let damUuids = match ? match.map(id => id.substring(4)) : [];
48 damUuids = damUuids.filter((id, pos) => {
49 return damUuids.indexOf(id) === pos;
50 });
51
52 const damAssets = await fetchDamAssets(damUuids, this.options);
53 const pagesObj: any = pages.map((page: any) =>
54 sanitizeJson(page, damAssets, pages, this.options, workspaces)
55 );
56
57 await writePagesFile(pagesObj, this.options);
58
59 if (this.options.magnolia.workspaces) {
60 for (const workspace of Object.keys(workspaces)) {
61 const workspaceData = workspaces[workspace];
62
63 if (workspaceData) {
64 const sanitized: any[] = [];
65
66 for (const item of workspaceData) {
67 sanitized.push(
68 sanitizeJson(item, damAssets, workspaceData, this.options, workspaces)
69 );
70 }
71
72 await writeWorkspaceFile(workspace, sanitized, this.options);
73 }
74 }
75 }
76
77 resolve();
78 });
79 }
80
81 public async start(): Promise<void> {
82 connect(this.options.queue.uri)
83 .then(conn => {
84 conn.on('error', this.retryConnection.bind(this, this.options.queue));
85 return conn.createChannel();
86 })
87 .then(channel => {
88 channel
89 .assertExchange(this.options.queue.exchangeName || 'paperboy', 'fanout', {
90 durable: false
91 })
92 .then(() => {
93 return channel.assertQueue(null, {
94 autoDelete: true
95 });
96 })
97 .then(qok => {
98 channel.bindQueue(qok.queue, this.options.queue.exchangeName || 'paperboy', '');
99 channel.consume(qok.queue, this.consumeMessage.bind(this), {
100 noAck: true
101 });
102 });
103 })
104 .catch(() => {
105 this.retryConnection();
106 });
107 }
108
109 private retryConnection() {
110 console.info('Connection to queue failed, will retry in 10s...');
111 setTimeout(() => {
112 this.start();
113 }, 10000);
114 }
115
116 private consumeMessage(message: Message | null) {
117 console.info(
118 "[x] from Magnolia: %s -> '%s'",
119 message.fields.routingKey,
120 message.content.toString()
121 );
122
123 this.generationLock.acquire(
124 'generationLock',
125 done => {
126 this.generate()
127 .then(() => {
128 this.callback().then(() => done());
129 })
130 .catch(err => {
131 console.error('Generation failed.', err);
132 done();
133 });
134 },
135 (err, ret) => {
136 if (err) {
137 console.info('Already another pending message. Message discarded!');
138 }
139 }
140 );
141 }
142}