UNPKG

4.97 kBPlain TextView Raw
1import { connect, Message, Connection } from "amqplib";
2
3import { Source, SourceCallback, SourceOptions } from "@neoskop/paperboy";
4
5import { fetchDamAssets } from "./dam.util";
6import { MagnoliaSourceOptions } from "./magnolia-source-options.interface";
7import {
8 fetchPages,
9 fetchSitemap,
10 fetchWorkspace,
11 sanitizeJson,
12 writePagesFile,
13 writeWorkspaceFile
14} from "./pages.util";
15import * as retry from "retry";
16
17import AsyncLock = require("async-lock");
18
19export class MagnoliaSource implements Source {
20 private readonly options: MagnoliaSourceOptions;
21 private readonly callback: SourceCallback;
22 private readonly generationLock: AsyncLock = new AsyncLock({ maxPending: 1 });
23
24 constructor(options: SourceOptions, callback: SourceCallback) {
25 this.options = <MagnoliaSourceOptions>options;
26 this.callback = callback;
27 }
28
29 public generate(): Promise<void> {
30 return new Promise(async resolve => {
31 const sitemap = await fetchSitemap(this.options);
32 const website = await fetchPages(this.options);
33 const pages = sitemap
34 .map(
35 path => website && website.find((page: any) => page["@path"] === path)
36 )
37 .filter(page => typeof page !== "undefined");
38
39 const workspaces: { [workspace: string]: any } = {};
40
41 if (this.options.magnolia.workspaces) {
42 for (const workspace of this.options.magnolia.workspaces) {
43 workspaces[workspace] = await fetchWorkspace(workspace, this.options);
44 }
45 }
46
47 // get dam jcr ids
48 const nodes = pages.concat(
49 Object.keys(workspaces).reduce(
50 (prev, current) => prev.concat(workspaces[current]),
51 []
52 )
53 );
54 const match = JSON.stringify(nodes).match(
55 /jcr:([0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})/g
56 );
57
58 let damUuids = match ? match.map(id => id.substring(4)) : [];
59 damUuids = damUuids.filter((id, pos) => {
60 return damUuids.indexOf(id) === pos;
61 });
62
63 const damAssets = await fetchDamAssets(damUuids, this.options);
64 const pagesObj: any = pages.map((page: any) =>
65 sanitizeJson(page, damAssets, pages, this.options, workspaces)
66 );
67
68 await writePagesFile(pagesObj, this.options);
69
70 if (this.options.magnolia.workspaces) {
71 for (const workspace of Object.keys(workspaces)) {
72 const workspaceData = workspaces[workspace];
73
74 if (workspaceData) {
75 const sanitized: any[] = [];
76
77 for (const item of workspaceData) {
78 sanitized.push(
79 sanitizeJson(
80 item,
81 damAssets,
82 workspaceData,
83 this.options,
84 workspaces
85 )
86 );
87 }
88
89 await writeWorkspaceFile(workspace, sanitized, this.options);
90 }
91 }
92 }
93
94 resolve();
95 });
96 }
97
98 public async start(): Promise<void> {
99 const operation = retry.operation({ forever: true });
100 let conn: Connection;
101 operation.attempt(
102 async () => {
103 try {
104 conn = await connect(this.options.queue.uri);
105 ["error", "close"].forEach($event =>
106 conn.on($event, this.retryConnection.bind(this, this.options.queue))
107 );
108 const channel = await conn.createChannel();
109
110 await channel.assertExchange(
111 this.options.queue.exchangeName || "paperboy",
112 "fanout",
113 {
114 durable: false
115 }
116 );
117
118 const qok = await channel.assertQueue(null, {
119 autoDelete: true
120 });
121
122 channel.bindQueue(
123 qok.queue,
124 this.options.queue.exchangeName || "paperboy",
125 ""
126 );
127
128 channel.consume(qok.queue, this.consumeMessage.bind(this), {
129 noAck: true
130 });
131 } catch (error) {
132 if (operation.retry(error)) {
133 console.error(`Could not establish connection to queue: ${error}`);
134 return;
135 }
136 }
137 },
138 {
139 timeout: 10 * 1000,
140 callback: () => {
141 if (conn) {
142 conn.close();
143 }
144 }
145 }
146 );
147 }
148
149 private retryConnection() {
150 console.info("Connection to queue dropped...");
151 this.start();
152 }
153
154 private consumeMessage(message: Message | null) {
155 console.info(
156 "[x] from Magnolia: %s -> '%s'",
157 message.fields.routingKey,
158 message.content.toString()
159 );
160
161 this.generationLock.acquire(
162 "generationLock",
163 async done => {
164 try {
165 await this.generate();
166 await this.callback();
167 } catch (err) {
168 console.error("Generation failed.", err);
169 }
170
171 done();
172 },
173 err => {
174 if (err) {
175 console.info("Already another pending message. Message discarded!");
176 }
177 }
178 );
179 }
180}