UNPKG

5.12 kBJavaScriptView Raw
1var http = require("http");
2var path = require("path");
3var request = require("request");
4var util = require("../util/util");
5
6/**
7 * Insight subsystem.
8 *
9 * When insight "push" requests come along, we handle them here and pass back to Cloud CMS.
10 */
11var exports = module.exports;
12
13exports.init = function(socket, callback)
14{
15 // listen for pushes from the client
16 socket.on("insight-push", function(data) {
17
18 console.log("Heard request for insight-push: " + data.rows.length);
19
20 if (process.configuration && process.configuration.insight && process.configuration.insight.enabled)
21 {
22 if (data && data.rows)
23 {
24 socket._log("Scheduling insight data rows: " + data.rows.length);
25
26 scheduleData(socket, data);
27 }
28 }
29 });
30
31 callback();
32};
33
34// pending data arrays keyed by warehouseId
35var PENDING_DATA = {};
36
37/**
38 * Data comes in and we schedule it for send to the Cloud CMS server.
39 *
40 * @param data
41 */
42var scheduleData = function(socket, data)
43{
44 var host = socket.handshake.headers['x-forwarded-host'] || socket.handshake.headers.host;
45
46 var gitana = socket.gitana;
47 if (!gitana)
48 {
49 return socket._log("Insight - the socket does not have a gitana instance attached to it, host: " + host + ", skipping...");
50 }
51
52 var warehouseId = data.warehouseId;
53 if (!warehouseId)
54 {
55 var application = gitana.application();
56 if (application)
57 {
58 warehouseId = application.warehouseId;
59 }
60 }
61
62 if (!warehouseId) {
63 return console.log("Insight - the application does not have a warehouseId, skipping...");
64 }
65
66 var ip = socket.handshake.headers['x-forwarded-for'] || socket.handshake.address.address;
67
68 // tag all rows with the "applicationKey" + ip + host
69 for (var i = 0; i < data.rows.length; i++)
70 {
71 data.rows[i].appKey = gitana.application().getId();
72
73 if (!data.rows[i].source) {
74 data.rows[i].source = {};
75 }
76
77 data.rows[i].source.ip = ip;
78 data.rows[i].source.host = host;
79 }
80
81 // apply into PENDING_DATA
82 if (!PENDING_DATA[warehouseId])
83 {
84 PENDING_DATA[warehouseId] = {
85 "rows": []
86 };
87 }
88
89 for (var i = 0; i < data.rows.length; i++)
90 {
91 PENDING_DATA[warehouseId].rows.push(data.rows[i]);
92 }
93
94 PENDING_DATA[warehouseId].gitana = gitana;
95 PENDING_DATA[warehouseId].log = socket._log;
96};
97
98var doSend = function(callback)
99{
100 // first find a warehouseId that has some rows
101 var warehouseId = null;
102
103 for (var k in PENDING_DATA)
104 {
105 if (PENDING_DATA[k] && PENDING_DATA[k].rows && PENDING_DATA[k].rows.length > 0)
106 {
107 warehouseId = k;
108 break;
109 }
110 }
111
112 if (!warehouseId)
113 {
114 // nothing to send
115 return callback();
116 }
117
118 var gitana = PENDING_DATA[warehouseId].gitana;
119 var log = PENDING_DATA[warehouseId].log;
120
121 // the data that we will send
122 var data = {
123 "rows": []
124 };
125
126 // move any PENDING_DATA for this warehouse into data.rows
127 // this cuts down the PENDING_DATA array to size 0
128 // and increases the size of data.rows
129 while (PENDING_DATA[warehouseId].rows.length > 0) {
130 data.rows.push(PENDING_DATA[warehouseId].rows.splice(0, 1)[0]);
131 }
132
133 // url over to cloud cms
134 var URL = util.asURL(process.env.GITANA_PROXY_SCHEME, process.env.GITANA_PROXY_HOST, process.env.GITANA_PROXY_PORT) + "/warehouses/" + warehouseId + "/interactions/_create";
135 var requestConfig = {
136 "url": URL,
137 "qs": {},
138 "method": "POST",
139 "json": data
140 };
141
142 console.log("Insight sync for warehouse: " + warehouseId + ", pushing rows: " + data.rows.length);
143 console.log(" -> url: " + URL);
144 console.log(" -> data: " + JSON.stringify(data));
145
146 // make a single attempt to send the data over
147 // if it fails, we add it back to the queue
148 util.retryGitanaRequest(log, gitana, requestConfig, 1, function(err, response, body) {
149
150 if (response && response.statusCode === 200 && body)
151 {
152 console.log("Insight sync for warehouse: " + warehouseId + " succeeded");
153 }
154 else
155 {
156 if (err || (body && body.error))
157 {
158 console.log("Insight sync for warehouse: " + warehouseId + " failed");
159
160 if (err) {
161 console.log(" -> err: " + JSON.stringify(err));
162 }
163
164 /*
165 // copy data.rows back into queue
166 for (var z = 0; z < data.rows.length; z++)
167 {
168 PENDING_DATA[warehouseId].rows.push(data.rows[z]);
169 }
170 */
171
172 if (body && body.error)
173 {
174 console.log(" -> body: " + JSON.stringify(body));
175 }
176 }
177 }
178
179 callback();
180 });
181};
182
183var f = function() {
184 setTimeout(function () {
185 doSend(function() {
186 f();
187 });
188 }, 250);
189};
190f();
191