UNPKG

5.07 kBPlain TextView Raw
1import { Source, SourceCallback, SourceOptions } from "@neoskop/paperboy";
2import { connect, Connection, Message } from "amqplib";
3import * as retry from "retry";
4import { fetchDamAssets } from "./dam.util";
5import { MagnoliaSourceOptions } from "./magnolia-source-options.interface";
6import {
7 fetchPages,
8 fetchSitemap,
9 fetchWorkspace,
10 sanitizeJson,
11 writePagesFile,
12 writeWorkspaceFile
13} from "./pages.util";
14
15import AsyncLock = require("async-lock");
16
17export class MagnoliaSource implements Source {
18 private readonly options: MagnoliaSourceOptions;
19 private readonly callback: SourceCallback;
20 private readonly generationLock: AsyncLock = new AsyncLock({ maxPending: 1 });
21
22 constructor(options: SourceOptions, callback: SourceCallback) {
23 this.options = <MagnoliaSourceOptions>options;
24 this.callback = callback;
25 }
26
27 public generate(): Promise<void> {
28 return new Promise(async resolve => {
29 const sitemap = await fetchSitemap(this.options);
30 const website = await fetchPages(this.options);
31 const pages = sitemap
32 .map(
33 path => website && website.find((page: any) => page["@path"] === path)
34 )
35 .filter(page => typeof page !== "undefined");
36
37 const workspaces: { [workspace: string]: any } = {};
38
39 if (this.options.magnolia.workspaces) {
40 for (const workspace of this.options.magnolia.workspaces) {
41 workspaces[workspace] = await fetchWorkspace(workspace, this.options);
42 }
43 }
44
45 // get dam jcr ids
46 const nodes = pages.concat(
47 Object.keys(workspaces).reduce(
48 (prev, current) => prev.concat(workspaces[current]),
49 []
50 )
51 );
52 const match = JSON.stringify(nodes).match(
53 /jcr:([0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12})/g
54 );
55
56 let damUuids = match ? match.map(id => id.substring(4)) : [];
57 damUuids = damUuids.filter((id, pos) => {
58 return damUuids.indexOf(id) === pos;
59 });
60
61 const damAssets = await fetchDamAssets(damUuids, this.options);
62 const pagesObj: any = pages.map((page: any) =>
63 sanitizeJson(page, damAssets, pages, this.options, workspaces)
64 );
65
66 await writePagesFile(pagesObj, this.options);
67
68 if (this.options.magnolia.workspaces) {
69 for (const workspace of Object.keys(workspaces)) {
70 const workspaceData = workspaces[workspace];
71
72 if (workspaceData) {
73 const sanitized: any[] = [];
74
75 for (const item of workspaceData) {
76 sanitized.push(
77 sanitizeJson(
78 item,
79 damAssets,
80 workspaceData,
81 this.options,
82 workspaces
83 )
84 );
85 }
86
87 await writeWorkspaceFile(workspace, sanitized, this.options);
88 }
89 }
90 }
91
92 resolve();
93 });
94 }
95
96 public async start(): Promise<void> {
97 const operation = retry.operation({ forever: true });
98 let conn: Connection;
99 operation.attempt(
100 async () => {
101 try {
102 conn = await connect(this.options.queue.uri);
103 const channel = await conn.createChannel();
104
105 await channel.assertExchange(
106 this.options.queue.exchangeName || "paperboy",
107 "fanout",
108 {
109 durable: false
110 }
111 );
112
113 const qok = await channel.assertQueue(null, {
114 autoDelete: true
115 });
116
117 channel.bindQueue(
118 qok.queue,
119 this.options.queue.exchangeName || "paperboy",
120 ""
121 );
122
123 channel.consume(qok.queue, this.consumeMessage.bind(this), {
124 noAck: true
125 });
126
127 ["error", "close"].forEach($event =>
128 conn.once(
129 $event,
130 this.retryConnection.bind(this, this.options.queue)
131 )
132 );
133 } catch (error) {
134 if (operation.retry(error)) {
135 console.error(`Could not establish connection to queue: ${error}`);
136 return;
137 }
138 }
139 },
140 {
141 timeout: 10 * 1000,
142 callback: () => {
143 if (conn) {
144 conn.close();
145 }
146 }
147 }
148 );
149 }
150
151 private retryConnection() {
152 console.log(
153 "Connection to queue dropped. Will start attempting to reconnect in 5 seconds."
154 );
155 setTimeout(this.start.bind(this), 5000);
156 }
157
158 private consumeMessage(message: Message | null) {
159 const content = JSON.parse(message.content.toString());
160 console.info(`[x] from ${content.source}`);
161
162 this.generationLock.acquire(
163 "generationLock",
164 async done => {
165 try {
166 await this.generate();
167 await this.callback();
168 } catch (err) {
169 console.error("Generation failed.", err);
170 }
171
172 done();
173 },
174 err => {
175 if (err) {
176 console.info("Already another pending message. Message discarded!");
177 }
178 }
179 );
180 }
181}