UNPKG

8 kBJavaScriptView Raw
1/*
2
3 ----------------------------------------------------------------------------
4 | qewd: Quick and Easy Web Development |
5 | |
6 | Copyright (c) 2017 M/Gateway Developments Ltd, |
7 | Reigate, Surrey UK. |
8 | All rights reserved. |
9 | |
10 | http://www.mgateway.com |
11 | Email: rtweed@mgateway.com |
12 | |
13 | |
14 | Licensed under the Apache License, Version 2.0 (the "License"); |
15 | you may not use this file except in compliance with the License. |
16 | You may obtain a copy of the License at |
17 | |
18 | http://www.apache.org/licenses/LICENSE-2.0 |
19 | |
20 | Unless required by applicable law or agreed to in writing, software |
21 | distributed under the License is distributed on an "AS IS" BASIS, |
22 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
23 | See the License for the specific language governing permissions and |
24 | limitations under the License. |
25 ----------------------------------------------------------------------------
26
27 3 January 2017
28
29*/
30
31function storeIncomingMessage(message) {
32 // first, create a timestamp index for this message
33 var time = process.hrtime();
34 var ix = time[0] * 1e9 + time[1];
35 ix = new Date().getTime() + '-' + ix;
36 var glo = {
37 global: this.resilientMode.documentName,
38 subscripts: ['message', ix, 'content'],
39 data: JSON.stringify(message)
40 };
41 this.db.set(glo, function(error, result) {});
42
43 glo.subscripts = ['message', ix, 'token'];
44 glo.data = message.token;
45 this.db.set(glo, function(error, result) {});
46
47 glo.subscripts = ['pending', message.token, ix];
48 glo.data = '';
49 this.db.set(glo, function(error, result) {});
50
51 return ix;
52}
53
54function nextValue(glo, callback) {
55 //console.log('**** nextValue for ' + JSON.stringify(glo));
56 this.db.order(glo, function(error, result) {
57 console.log('nextValue - result = ' + JSON.stringify(result));
58 if (!error && callback) callback(result);
59 });
60}
61
62function requeueMessages(token, ix, handleMessage) {
63 var ewdQueueDocument = this.resilientMode.documentName;
64
65 // put any pending messages with this token back onto the queue
66
67 var pendingGlo = {
68 global: ewdQueueDocument,
69 subscripts: ['pending', token, ""]
70 };
71 var q = this;
72
73 //console.log('pendingGlo = ' + JSON.stringify(pendingGlo));
74
75 var callback = function(node) {
76 //console.log('*** callback; ix = ' + ix + ';node = ' + JSON.stringify(node));
77 var dbIndex = node.result;
78 if (dbIndex !== '') {
79 // ignore the current latest active pending record
80 if (dbIndex !== ix) {
81 var glo = {
82 global: ewdQueueDocument,
83 subscripts: ['message', dbIndex, 'content']
84 };
85 q.db.get(glo, function(error, result) {
86 if (result.defined) {
87 var message = JSON.parse(result.data);
88 if (message.type !== 'ewd-register' && message.type !== 'ewd-reregister') {
89 //console.log('adding message ' + result.data + 'back to queue');
90
91 // if message had been being processed by a worker but hadn't completed
92 // then flag as a re-submission
93
94 glo.subscripts = ['message', dbIndex, 'workerStatus'];
95 q.db.get(glo, function(error, result) {
96 if (result.defined && result.data === 'started') {
97 message.resubmitted = true;
98 }
99 // put back the message onto the queue
100 handleMessage(message);
101 // repeat the process for the next pending record
102 nextValue.call(q, node, callback);
103 });
104 }
105 }
106 });
107 // delete the pending record
108 q.db.kill(node, function(error, result) {});
109 }
110 else {
111 nextValue.call(q, node, callback);
112 }
113 }
114 // no more pending records, so finish
115 };
116 // start the loop through the pending messages for this token
117 console.log('Checking to see if any messages need re-queueing');
118 nextValue.call(this, pendingGlo, callback);
119}
120
121function storeResponse(resultObj, token, ix, count, handleMessage) {
122 if (resultObj.type === 'ewd-reregister') {
123 // re-queue any pending messages for this token
124 requeueMessages.call(this, token, ix, handleMessage);
125 }
126 // save the response to database
127 saveResponse.call(this, ix, count, JSON.stringify(resultObj));
128 if (resultObj.finished) {
129 // remove the "pending" index record
130 removePendingIndex.call(this, token, ix);
131 }
132}
133
134function saveResponse(ix, count, data) {
135 //console.log('saveResponse: ix = ' + ix + '; count: ' + count + '; data = ' + data);
136 var glo = {
137 global: this.resilientMode.documentName,
138 subscripts: ['message', ix, 'response', count],
139 data: data
140 };
141 this.db.set(glo, function(error, result) {});
142}
143
144function removePendingIndex(token, ix) {
145 var glo = {
146 global: this.resilientMode.documentName,
147 subscripts: ['pending', token, ix]
148 };
149 var q = this;
150 setTimeout(function() {
151 // delay it a bit to ensure this happens after the pending index is saved
152 q.db.kill(glo, function(error, result) {});
153 }, 1000);
154}
155
156function storeWorkerStatusUpdate(messageObj, status) {
157 var queueStore = new this.documentStore.DocumentNode(this.userDefined.config.resilientMode.documentName, ["message", messageObj.dbIndex, "workerStatus"]);
158 queueStore.value = status;
159}
160
161function cleardownQueueBackup() {
162 var queue = new this.documentStore.DocumentNode(this.userDefined.config.resilientMode.documentName);
163 var messages = queue.$('message');
164 var pending = queue.$('pending');
165
166 var now = new Date().getTime();
167 var diff = this.userDefined.config.resilientMode.keepPeriod;
168 diff = diff * 1000;
169 var cutOff = now - diff;
170
171 console.log(process.pid + ': Clearing down queue backup document, up to ' + (diff /1000) + ' seconds ago');
172
173 messages.forEachChild(function(dbIndex, messageObj) {
174 var timestamp = parseInt(dbIndex.split('-')[0]);
175 if (timestamp < cutOff) {
176 var token = messageObj.$('token').value;
177 if (!pending.$(token).$(timestamp).exists) {
178 messageObj.delete();
179 console.log(process.pid + ': ' + dbIndex + ' deleted');
180 }
181 }
182 else {
183 return true; // cutoff reached, so stop checking the queue backup document
184 }
185 });
186
187 console.log(process.pid + ': Queue backup document cleared down');
188}
189
190function garbageCollector(delay) {
191
192 var q = this;
193 delay = delay*1000 || 300000; // every 5 minutes
194 var garbageCollector;
195
196 this.on('stop', function() {
197 clearInterval(garbageCollector);
198 console.log('Queue Backup Garbage Collector has stopped');
199 });
200
201 garbageCollector = setInterval(function() {
202 cleardownQueueBackup.call(q);
203 }, delay);
204
205 console.log('Queue Backup Garbage Collector has started in worker ' + process.pid);
206}
207
208module.exports = {
209 storeResponse: storeResponse,
210 storeIncomingMessage: storeIncomingMessage,
211 storeWorkerStatusUpdate: storeWorkerStatusUpdate,
212 garbageCollector: garbageCollector
213};
\No newline at end of file