1 | import { Source, SourceCallback, SourceOptions } from "@neoskop/paperboy";
|
2 | import { connect, Connection, Message } from "amqplib";
|
3 | import * as retry from "retry";
|
4 | import { fetchDamAssets } from "./dam.util";
|
5 | import { MagnoliaSourceOptions } from "./magnolia-source-options.interface";
|
6 | import {
|
7 | fetchPages,
|
8 | fetchSitemap,
|
9 | fetchWorkspace,
|
10 | sanitizeJson,
|
11 | writePagesFile,
|
12 | writeWorkspaceFile
|
13 | } from "./pages.util";
|
14 |
|
15 | import AsyncLock = require("async-lock");
|
16 |
|
17 | export class MagnoliaSource implements Source {
|
18 | private readonly options: MagnoliaSourceOptions;
|
19 | private readonly callback: SourceCallback;
|
20 | private readonly generationLock: AsyncLock = new AsyncLock({ maxPending: 1 });
|
21 |
|
22 | constructor(options: SourceOptions, callback: SourceCallback) {
|
23 | this.options = <MagnoliaSourceOptions>options;
|
24 | this.callback = callback;
|
25 | }
|
26 |
|
27 | public generate(): Promise<void> {
|
28 | return new Promise(async resolve => {
|
29 | const sitemap = await fetchSitemap(this.options);
|
30 | const website = await fetchPages(this.options);
|
31 | const pages = sitemap
|
32 | .map(
|
33 | path => website && website.find((page: any) => page["@path"] === path)
|
34 | )
|
35 | .filter(page => typeof page !== "undefined");
|
36 |
|
37 | const workspaces: { [workspace: string]: any } = {};
|
38 |
|
39 | if (this.options.magnolia.workspaces) {
|
40 | for (const workspace of this.options.magnolia.workspaces) {
|
41 | workspaces[workspace] = await fetchWorkspace(workspace, this.options);
|
42 | }
|
43 | }
|
44 |
|
45 |
|
46 | const nodes = pages.concat(
|
47 | Object.keys(workspaces).reduce(
|
48 | (prev, current) => prev.concat(workspaces[current]),
|
49 | []
|
50 | )
|
51 | );
|
52 | const match = JSON.stringify(nodes).match(
|
53 | /jcr:([0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})/g
|
54 | );
|
55 |
|
56 | let damUuids = match ? match.map(id => id.substring(4)) : [];
|
57 | damUuids = damUuids.filter((id, pos) => {
|
58 | return damUuids.indexOf(id) === pos;
|
59 | });
|
60 |
|
61 | const damAssets = await fetchDamAssets(damUuids, this.options);
|
62 | const pagesObj: any = pages.map((page: any) =>
|
63 | sanitizeJson(page, damAssets, pages, this.options, workspaces)
|
64 | );
|
65 |
|
66 | await writePagesFile(pagesObj, this.options);
|
67 |
|
68 | if (this.options.magnolia.workspaces) {
|
69 | for (const workspace of Object.keys(workspaces)) {
|
70 | const workspaceData = workspaces[workspace];
|
71 |
|
72 | if (workspaceData) {
|
73 | const sanitized: any[] = [];
|
74 |
|
75 | for (const item of workspaceData) {
|
76 | sanitized.push(
|
77 | sanitizeJson(
|
78 | item,
|
79 | damAssets,
|
80 | workspaceData,
|
81 | this.options,
|
82 | workspaces
|
83 | )
|
84 | );
|
85 | }
|
86 |
|
87 | await writeWorkspaceFile(workspace, sanitized, this.options);
|
88 | }
|
89 | }
|
90 | }
|
91 |
|
92 | resolve();
|
93 | });
|
94 | }
|
95 |
|
96 | public async start(): Promise<void> {
|
97 | const operation = retry.operation({ forever: true });
|
98 | let conn: Connection;
|
99 | operation.attempt(
|
100 | async () => {
|
101 | try {
|
102 | conn = await connect(this.options.queue.uri);
|
103 | const channel = await conn.createChannel();
|
104 |
|
105 | await channel.assertExchange(
|
106 | this.options.queue.exchangeName || "paperboy",
|
107 | "fanout",
|
108 | {
|
109 | durable: false
|
110 | }
|
111 | );
|
112 |
|
113 | const qok = await channel.assertQueue(null, {
|
114 | autoDelete: true
|
115 | });
|
116 |
|
117 | channel.bindQueue(
|
118 | qok.queue,
|
119 | this.options.queue.exchangeName || "paperboy",
|
120 | ""
|
121 | );
|
122 |
|
123 | channel.consume(qok.queue, this.consumeMessage.bind(this), {
|
124 | noAck: true
|
125 | });
|
126 |
|
127 | ["error", "close"].forEach($event =>
|
128 | conn.once(
|
129 | $event,
|
130 | this.retryConnection.bind(this, this.options.queue)
|
131 | )
|
132 | );
|
133 | } catch (error) {
|
134 | if (operation.retry(error)) {
|
135 | console.error(`Could not establish connection to queue: ${error}`);
|
136 | return;
|
137 | }
|
138 | }
|
139 | },
|
140 | {
|
141 | timeout: 10 * 1000,
|
142 | callback: () => {
|
143 | if (conn) {
|
144 | conn.close();
|
145 | }
|
146 | }
|
147 | }
|
148 | );
|
149 | }
|
150 |
|
151 | private retryConnection() {
|
152 | console.log(
|
153 | "Connection to queue dropped. Will start attempting to reconnect in 5 seconds."
|
154 | );
|
155 | setTimeout(this.start.bind(this), 5000);
|
156 | }
|
157 |
|
158 | private consumeMessage(message: Message | null) {
|
159 | const content = JSON.parse(message.content.toString());
|
160 | console.info(`[x] from ${content.source}`);
|
161 |
|
162 | this.generationLock.acquire(
|
163 | "generationLock",
|
164 | async done => {
|
165 | try {
|
166 | await this.generate();
|
167 | await this.callback();
|
168 | } catch (err) {
|
169 | console.error("Generation failed.", err);
|
170 | }
|
171 |
|
172 | done();
|
173 | },
|
174 | err => {
|
175 | if (err) {
|
176 | console.info("Already another pending message. Message discarded!");
|
177 | }
|
178 | }
|
179 | );
|
180 | }
|
181 | }
|