UNPKG

20.6 kBJavaScriptView Raw
1var async = require("async");
2var cluster = require("cluster");
3
4var determineHost = function(item, obj)
5{
6 var host = null;
7
8 // if virtual hosts aren't enabled, then host is the standalone host ("local")
9 if (!process.configuration.virtualHost || !process.configuration.virtualHost.enabled)
10 {
11 host = process.env.CLOUDCMS_STANDALONE_HOST;
12 }
13 else
14 {
15 // assume host if specified on the item
16 host = item.host;
17
18 // if not specified on the item, assume value imputed based on virtual host being active
19 if (process.configuration.virtualHost && process.configuration.virtualHost.enabled)
20 {
21 if (!host && item.tenantDnsSlug)
22 {
23 host = item.tenantDnsSlug + ".cloudcms.net";
24 }
25 }
26
27 // if we have an object, maybe we can use thaT?
28 if (!host && obj)
29 {
30 host = obj.host;
31
32 if (process.configuration.virtualHost && process.configuration.virtualHost.enabled)
33 {
34 if (!host && obj.tenantDnsSlug)
35 {
36 host = obj.tenantDnsSlug + ".cloudcms.net";
37 }
38 }
39 }
40 }
41
42 return host;
43};
44
45var handleNotificationMessages = function(items, callback) {
46
47 if (!items) {
48 return callback();
49 }
50
51 // wrap the processing of each item into a series function
52 var fns = [];
53 for (var i = 0; i < items.length; i++)
54 {
55 var fn = function(item, i) {
56 return function(done) {
57
58 //console.log("WORKING ON ITEM: " + i + ", item: " + JSON.stringify(item, null, " "));
59
60 var operation = item.operation;
61
62 /**
63 * @deprecated support for "invalidate_object"
64 * This is left in to support legacy installations of the API that are still doing things object-by-object.
65 * Newer API versions do bulk operations and use the "invalidate_objects" notification event.
66 *
67 * This will be removed at some point in the future. Please upgrade to the latest Cloud CMS API.
68 */
69 if (operation === "invalidate_object")
70 {
71 var type = item.type;
72
73 if (type === "node")
74 {
75 var ref = item.ref;
76 var nodeId = item.id;
77 var branchId = item.branchId;
78 var repositoryId = item.repositoryId;
79
80 // TEMP: some legacy support to aid in transition
81 if (!repositoryId || !branchId || !nodeId)
82 {
83 var identifier = ref.substring(ref.indexOf("://") + 3);
84 var parts = identifier.split("/").reverse();
85
86 nodeId = parts[0];
87 branchId = parts[1];
88 repositoryId = parts[2];
89 }
90
91 var host = determineHost(item);
92
93 var paths = item.paths || {};
94
95 // broadcast invalidation
96 process.broadcast.publish("node_invalidation", {
97 "ref": ref,
98 "nodeId": nodeId,
99 "branchId": branchId,
100 "repositoryId": repositoryId,
101 "isMasterBranch": item.isMasterBranch,
102 "host": host,
103 "paths": paths
104 }, function(err) {
105 return done(err);
106 });
107 }
108 else
109 {
110 return done();
111 }
112 }
113 else if (operation === "invalidate_objects")
114 {
115 var invalidations = item.invalidations;
116 if (invalidations && invalidations.length > 0)
117 {
118 var z_fns = [];
119 for (var z = 0; z < invalidations.length; z++)
120 {
121 var z_fn = function(item, obj, z) {
122 return function(z_done) {
123
124 var host = determineHost(item, obj);
125
126 var type = obj.type;
127
128 if (type === "node")
129 {
130 var ref = obj.ref;
131 var nodeId = obj.id;
132 var branchId = obj.branchId;
133 var repositoryId = obj.repositoryId;
134
135 var paths = obj.paths || {};
136
137 // TEMP: some legacy support to aid in transition
138 if (!repositoryId || !branchId || !nodeId)
139 {
140 var identifier = ref.substring(ref.indexOf("://") + 3);
141 var parts = identifier.split("/").reverse();
142
143 nodeId = parts[0];
144 branchId = parts[1];
145 repositoryId = parts[2];
146 }
147
148 console.log("Sending node invalidation for host: " + host);
149
150 // broadcast invalidation
151 process.broadcast.publish("node_invalidation", {
152 "ref": ref,
153 "nodeId": nodeId,
154 "branchId": branchId,
155 "repositoryId": repositoryId,
156 "isMasterBranch": obj.isMasterBranch,
157 "host": host,
158 "paths": paths
159 }, z_done);
160 }
161 else if (type === "settings")
162 {
163 var ref = obj.ref;
164 var settingsKey = obj.settingsKey;
165 var settingsScope = obj.settingsScope;
166
167 // broadcast invalidation
168 process.broadcast.publish("settings_invalidation", {
169 "ref": ref,
170 "settingsKey": settingsKey,
171 "settingsScope": settingsScope,
172 "host": host
173 }, z_done);
174 }
175 else if (type === "application")
176 {
177 var ref = obj.ref;
178 var applicationId = obj.applicationId;
179 var deploymentKey = obj.deploymentKey;
180 var stackId = obj.stackId;
181 var stackMembers = obj.stackMembers;
182
183 process.broadcast.publish("application_invalidation", {
184 "ref": ref,
185 "applicationId": applicationId,
186 "deploymentKey": deploymentKey,
187 "host": host,
188 "stackId": stackId,
189 "stackMembers": stackMembers
190 });
191
192 z_done();
193 }
194 else if (type === "uiconfig")
195 {
196 var ref = obj.ref;
197 var id = obj.id;
198
199 // broadcast invalidation
200 process.broadcast.publish("uiconfig_invalidation", {
201 "ref": ref,
202 "id": id,
203 "host": host
204 }, z_done);
205 }
206 else
207 {
208 z_done();
209 }
210 }
211 }(item, invalidations[z], z);
212 z_fns.push(z_fn);
213 }
214
215 async.series(z_fns, function(err) {
216 return done(err);
217 });
218 }
219 else
220 {
221 return done();
222 }
223 }
224 else if (operation === "invalidate_application")
225 {
226 // TODO: invalidate any cache dependent on application
227 return done();
228 }
229 else if (operation === "invalidate_application_page_rendition")
230 {
231 console.log("invalidate_application_page_rendition event\n" + JSON.stringify(item,null,2));
232
233 var deploymentKey = item.deploymentKey;
234 var applicationId = item.applicationId;
235
236 var repositoryId = item.repositoryId;
237 var branchId = item.branchId;
238 var isMasterBranch = item.isMasterBranch;
239
240 var host = determineHost(item);
241
242 // SAFETY CHECK: if no repository and/or branch, just bail
243 if (!repositoryId || !branchId) {
244 console.log("Missing repositoryId or branchId, skipping WCM page invalidation (1)");
245 return done();
246 }
247
248 var scope = item.scope;
249 var key = item.key;
250 var pageCacheKey = item.pageCacheKey;
251
252 var message = {
253 "key": key,
254 "scope": scope,
255 "pageCacheKey": pageCacheKey,
256 "applicationId": applicationId,
257 "deploymentKey": deploymentKey,
258 "host": host,
259 "repositoryId": repositoryId,
260 "branchId": branchId,
261 "isMasterBranch": isMasterBranch
262 };
263
264 var fragmentCacheKey = item.fragmentCacheKey;
265 if (fragmentCacheKey) {
266 message.fragmentCacheKey = fragmentCacheKey;
267 }
268
269 // broadcast invalidation
270 process.broadcast.publish("invalidate_page_rendition", message, function(err) {
271 console.log("published invalidate_page_rendition message. err:" + err + "\nmessage: " + JSON.stringify(item,null,2));
272 return done(err);
273 });
274 }
275 else if (operation === "invalidate_application_page_renditions")
276 {
277 console.log("invalidate_application_page_renditions event");
278
279 var invalidations = item.invalidations;
280 if (invalidations && invalidations.length > 0)
281 {
282 var z_fns = [];
283 for (var z = 0; z < invalidations.length; z++)
284 {
285 var z_fn = function(item, obj) {
286 return function(z_done) {
287
288 var deploymentKey = obj.deploymentKey;
289 var applicationId = obj.applicationId;
290
291 var repositoryId = obj.repositoryId;
292 var branchId = obj.branchId;
293 var isMasterBranch = obj.isMasterBranch;
294
295 var host = determineHost(item, obj);
296
297 // SAFETY CHECK: if no repository and/or branch, just bail
298 if (!repositoryId || !branchId) {
299 console.log("Missing repositoryId or branchId, skipping WCM page invalidation (2)");
300 return z_done();
301 }
302
303 var scope = obj.scope;
304 var key = obj.key;
305 var pageCacheKey = obj.pageCacheKey;
306
307 var message = {
308 "key": key,
309 "scope": scope,
310 "pageCacheKey": pageCacheKey,
311 "applicationId": applicationId,
312 "deploymentKey": deploymentKey,
313 "host": host,
314 "repositoryId": repositoryId,
315 "branchId": branchId,
316 "isMasterBranch": isMasterBranch
317 };
318
319 var fragmentCacheKey = obj.fragmentCacheKey;
320 if (fragmentCacheKey) {
321 message.fragmentCacheKey = fragmentCacheKey;
322 }
323
324 // broadcast invalidation
325 process.broadcast.publish("invalidate_page_rendition", message, function(err) {
326 console.log("published invalidate_page_rendition message. err:" + err + "\nmessage: " + JSON.stringify(message,null,2));
327 z_done(err);
328 });
329
330 }
331 }(item, invalidations[z]);
332 z_fns.push(z_fn);
333 }
334
335 async.series(z_fns, function(err) {
336 return done(err);
337 });
338 }
339 }
340 else if (operation === "invalidate_application_all_page_renditions")
341 {
342 var deploymentKey = item.deploymentKey;
343 var applicationId = item.applicationId;
344 var scope = item.scope;
345
346 var host = determineHost(item);
347
348 var message = {
349 "applicationId": applicationId,
350 "deploymentKey": deploymentKey,
351 "scope": scope,
352 "host": host
353 };
354
355 // broadcast invalidation
356 process.broadcast.publish("invalidate_all_page_renditions", message, function(err) {
357 console.log("published invalidate_all_page_renditions message. err:" + err + "\nmessage: " + JSON.stringify(message,null,2));
358 return done(err);
359 });
360 }
361 else
362 {
363 console.log("Unknown notification item: " + JSON.stringify(item));
364
365 // just assume it's something we can't deal with
366 return done({
367 "message": "Unknown notification item: " + item.rawMessage
368 });
369 }
370 }
371 }(items[i], i);
372 fns.push(fn);
373 }
374
375 // run all of the functions in series
376 async.series(fns, function(err) {
377 callback(err);
378 });
379};
380
381var completeRunnerFn = function(provider, printStartMessage)
382{
383 return runnerFn(provider, printStartMessage);
384};
385
386var runnerCount = 0;
387var runnerFn = function(provider, printStartMessage)
388{
389 var wid = "main";
390 if (cluster && cluster.worker)
391 {
392 wid = cluster.worker.id;
393 }
394
395 var runner = function(provider, runnerCount, wid, printStartMessage)
396 {
397 return function() {
398
399 if (printStartMessage)
400 {
401 console.log("[" + wid + "][" + runnerCount + "] Starting notifications loop");
402 }
403
404 provider.process(function(err, items, postHandleCallback) {
405
406 if (err)
407 {
408 console.log("[" + wid + "][" + runnerCount + "] Notification Provider error: " + err, err.stack);
409
410 // start it up again
411 return completeRunnerFn(provider);
412 }
413
414 if (!items) {
415 items = [];
416 }
417
418 if (items.length === 0)
419 {
420 // start it up again
421 return completeRunnerFn(provider, false);
422 }
423
424 console.log("[" + wid + "][" + runnerCount + "] Notification Provider found: " + items.length + " notification items");
425
426 handleNotificationMessages(items, function (err) {
427
428 console.log("[" + wid + "][" + runnerCount + "] Notification Provider handled: " + items.length + " items");
429
430 postHandleCallback(err, items, function (err, items, deletedItems) {
431
432 console.log("[" + wid + "][" + runnerCount + "] Notification Provider completed - handled: " + items.length + ", deleted: " + deletedItems.length);
433
434 // start it up again
435 return completeRunnerFn(provider, true);
436
437 });
438 });
439 });
440 } ;
441 }(provider, runnerCount++, wid, printStartMessage);
442
443 setTimeout(runner, 500);
444};
445
446
447module.exports = function()
448{
449 var r = {};
450
451 r.start = function(callback) {
452
453 var config = process.configuration;
454 if (!config["notifications"])
455 {
456 config["notifications"] = {
457 "enabled": false,
458 "type": "",
459 "configuration": {}
460 };
461 }
462
463 var notifications = config["notifications"];
464 if (typeof(process.env.CLOUDCMS_NOTIFICATIONS_ENABLED) !== "undefined")
465 {
466 if (!process.env.CLOUDCMS_NOTIFICATIONS_ENABLED || process.env.CLOUDCMS_NOTIFICATIONS_ENABLED === "false")
467 {
468 notifications.enabled = false;
469 }
470 else if (process.env.CLOUDCMS_NOTIFICATIONS_ENABLED || process.env.CLOUDCMS_NOTIFICATIONS_ENABLED === "true")
471 {
472 notifications.enabled = true;
473 }
474 }
475
476 if (notifications.enabled)
477 {
478 if (process.env.CLOUDCMS_NOTIFICATIONS_TYPE)
479 {
480 notifications.type = process.env.CLOUDCMS_NOTIFICATIONS_TYPE;
481 }
482
483 if (!notifications.type)
484 {
485 console.error("Notification.type is not configured")
486 return callback();
487 }
488
489 var type = notifications.type;
490 var configuration = notifications.configuration;
491
492 var provider = require("./providers/" + type);
493 provider.start(configuration, function (err) {
494
495 if (err)
496 {
497 return callback(err);
498 }
499
500 // this starts the "thread" for the provider listener
501 runnerFn(provider, true);
502
503 callback();
504 });
505 }
506 else
507 {
508 callback();
509 }
510 };
511
512 return r;
513}();