UNPKG

5.74 kBJavaScriptView Raw
1'use strict';
2
3Object.defineProperty(exports, "__esModule", {
4 value: true
5});
6
7var _extends = Object.assign || function (target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i]; for (var key in source) { if (Object.prototype.hasOwnProperty.call(source, key)) { target[key] = source[key]; } } } return target; };
8
9var _kefir = require('kefir');
10
11var _kefir2 = _interopRequireDefault(_kefir);
12
13var _once = require('lodash/fp/once');
14
15var _once2 = _interopRequireDefault(_once);
16
17var _range = require('lodash/fp/range');
18
19var _range2 = _interopRequireDefault(_range);
20
21var _redisClient = require('./redis-client');
22
23var _query = require('./query');
24
25function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
26
27function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
28
29var tryParse = function tryParse(raw) {
30 try {
31 return JSON.parse(raw);
32 } catch (ex) {
33 return { type: '@@RAW', payload: raw };
34 }
35};
36
37function flush(response) {
38 if (response.flush && response.flush.name !== 'deprecated') {
39 response.flush();
40 }
41}
42
43var toOutput = function toOutput(events) {
44 return 'id: ' + events[events.length - 1].id + '\nevent: INCMSG\ndata: ' + JSON.stringify(events) + '\n\n';
45};
46
47exports.default = function (_ref) {
48 var redis = _ref.redis,
49 history = _ref.history,
50 debug = _ref.debug,
51 namespc = _ref.namespc,
52 burst = _ref.burst;
53
54 var subClient = (0, _redisClient.createClient)(redis);
55 var queryClient = (0, _redisClient.createClient)(redis);
56
57 // prepare cache
58 var MAX_SIZE = history.size;
59 var list = [];
60 var cache = {};
61
62 var addToCache = function addToCache(message) {
63 debug && console.log('adding to cache', message.id);
64
65 var newSize = list.unshift(message.id);
66 cache[message.id] = message;
67
68 if (newSize > MAX_SIZE) {
69 var expired = list.splice(MAX_SIZE);
70 expired.forEach(function (removingId) {
71 delete cache[removingId];
72 });
73 }
74
75 debug && console.log('HISTORY', list);
76 debug && console.log('CACHE\n', cache);
77 };
78
79 var message$ = _kefir2.default.fromEvents(subClient, 'message', function (channel, message) {
80 return message;
81 }).map(function (message) {
82 var rawId = message.split(':')[0];
83 var id = Number(rawId);
84 var rawEvent = message.slice(rawId.length + 1);
85
86 return _extends({
87 id: id
88 }, tryParse(rawEvent));
89 }).onValue(addToCache);
90
91 var addClient = function addClient(src, opts, match, req, res) {
92 req.socket.setTimeout(0);
93 req.socket.setNoDelay(true);
94 req.socket.setKeepAlive(true);
95
96 res.writeHead(200, {
97 'Content-Type': 'text/event-stream;charset=UTF-8',
98 'Cache-Control': 'no-cache',
99 Connection: 'keep-alive'
100 });
101
102 res.write(':ok\n\n');
103
104 // cast to integer
105 var retry = opts.retry | 0;
106
107 if (retry) {
108 res.write('retry: ' + retry + '\n');
109 }
110
111 flush(res);
112
113 var subscription = src.filter(function (x) {
114 return x;
115 }).bufferWithTimeOrCount(burst.time, burst.count).filter(function (b) {
116 return b.length;
117 }).map(toOutput).observe(function (block) {
118 debug && console.log('send to %s %s', req.url, block);
119 res.write(block);
120 });
121
122 var removeClient = (0, _once2.default)(function () {
123 debug && console.log('removing', req.url);
124 subscription.unsubscribe();
125 res.end();
126 });
127
128 req.on('end', removeClient);
129 req.on('close', removeClient);
130 res.on('finish', removeClient);
131 };
132
133 var getInitialValues = function getInitialValues(lastEventId) {
134 if (!lastEventId) {
135 return _kefir2.default.never();
136 }
137
138 // history[0] is the latest id
139 var current = list[0];
140 var oldestInCache = list[list.length - 1];
141
142 debug && console.log('current', current);
143 if (oldestInCache <= lastEventId + 1) {
144 return _kefir2.default.constant((0, _range2.default)(lastEventId + 1, current + 1).map(function (id) {
145 return cache[id];
146 })).flatten();
147 }
148
149 debug && console.error('too old, getting more from redis');
150 var fromCache$ = _kefir2.default.constant((0, _range2.default)(oldestInCache, current + 1).map(function (id) {
151 return cache[id];
152 }));
153
154 var fromRedis$ = _kefir2.default.fromPromise((0, _query.query)(queryClient, namespc, lastEventId + 1, oldestInCache - 1));
155
156 return _kefir2.default.concat([fromRedis$, fromCache$]).flatten();
157 };
158
159 var service = function () {
160 var _ref2 = _asyncToGenerator(function* (req, res) {
161 try {
162 debug && console.log('connected', req.url);
163 var lastEventId = Number(req.headers['last-event-id']);
164
165 var initialValues = getInitialValues(lastEventId);
166
167 debug && console.log('lastEventId', lastEventId);
168
169 // const url = req.url;
170 // const query = parse(url.split('?')[1]);
171
172 addClient(_kefir2.default.concat([initialValues, message$]), {}, {}, req, res);
173 } catch (ex) {
174 console.log(ex);
175 throw ex;
176 }
177 });
178
179 return function service(_x, _x2) {
180 return _ref2.apply(this, arguments);
181 };
182 }();
183
184 subClient.subscribe(namespc + '::events');
185
186 return {
187 service: service,
188 unsubscribe: function unsubscribe() {
189 console.log('unsubscribing from redis');
190 subClient.unsubscribe(namespc + '::events');
191 }
192 };
193};
\No newline at end of file