1 | 'use strict';
|
2 |
|
3 | Object.defineProperty(exports, "__esModule", {
|
4 | value: true
|
5 | });
|
6 |
|
7 | var _extends = Object.assign || function (target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i]; for (var key in source) { if (Object.prototype.hasOwnProperty.call(source, key)) { target[key] = source[key]; } } } return target; };
|
8 |
|
9 | var _kefir = require('kefir');
|
10 |
|
11 | var _kefir2 = _interopRequireDefault(_kefir);
|
12 |
|
13 | var _once = require('lodash/fp/once');
|
14 |
|
15 | var _once2 = _interopRequireDefault(_once);
|
16 |
|
17 | var _range = require('lodash/fp/range');
|
18 |
|
19 | var _range2 = _interopRequireDefault(_range);
|
20 |
|
21 | var _redisClient = require('./redis-client');
|
22 |
|
23 | var _query = require('./query');
|
24 |
|
25 | function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
|
26 |
|
27 | function _asyncToGenerator(fn) { return function () { var gen = fn.apply(this, arguments); return new Promise(function (resolve, reject) { function step(key, arg) { try { var info = gen[key](arg); var value = info.value; } catch (error) { reject(error); return; } if (info.done) { resolve(value); } else { return Promise.resolve(value).then(function (value) { step("next", value); }, function (err) { step("throw", err); }); } } return step("next"); }); }; }
|
28 |
|
29 | var tryParse = function tryParse(raw) {
|
30 | try {
|
31 | return JSON.parse(raw);
|
32 | } catch (ex) {
|
33 | return { type: '@@RAW', payload: raw };
|
34 | }
|
35 | };
|
36 |
|
37 | function flush(response) {
|
38 | if (response.flush && response.flush.name !== 'deprecated') {
|
39 | response.flush();
|
40 | }
|
41 | }
|
42 |
|
43 | var toOutput = function toOutput(events) {
|
44 | return 'id: ' + events[events.length - 1].id + '\nevent: INCMSG\ndata: ' + JSON.stringify(events) + '\n\n';
|
45 | };
|
46 |
|
47 | exports.default = function (_ref) {
|
48 | var redis = _ref.redis,
|
49 | history = _ref.history,
|
50 | debug = _ref.debug,
|
51 | namespc = _ref.namespc,
|
52 | burst = _ref.burst;
|
53 |
|
54 | var subClient = (0, _redisClient.createClient)(redis);
|
55 | var queryClient = (0, _redisClient.createClient)(redis);
|
56 |
|
57 |
|
58 | var MAX_SIZE = history.size;
|
59 | var list = [];
|
60 | var cache = {};
|
61 |
|
62 | var addToCache = function addToCache(message) {
|
63 | debug && console.log('adding to cache', message.id);
|
64 |
|
65 | var newSize = list.unshift(message.id);
|
66 | cache[message.id] = message;
|
67 |
|
68 | if (newSize > MAX_SIZE) {
|
69 | var expired = list.splice(MAX_SIZE);
|
70 | expired.forEach(function (removingId) {
|
71 | delete cache[removingId];
|
72 | });
|
73 | }
|
74 |
|
75 | debug && console.log('HISTORY', list);
|
76 | debug && console.log('CACHE\n', cache);
|
77 | };
|
78 |
|
79 | var message$ = _kefir2.default.fromEvents(subClient, 'message', function (channel, message) {
|
80 | return message;
|
81 | }).map(function (message) {
|
82 | var rawId = message.split(':')[0];
|
83 | var id = Number(rawId);
|
84 | var rawEvent = message.slice(rawId.length + 1);
|
85 |
|
86 | return _extends({
|
87 | id: id
|
88 | }, tryParse(rawEvent));
|
89 | }).onValue(addToCache);
|
90 |
|
91 | var addClient = function addClient(src, opts, match, req, res) {
|
92 | req.socket.setTimeout(0);
|
93 | req.socket.setNoDelay(true);
|
94 | req.socket.setKeepAlive(true);
|
95 |
|
96 | res.writeHead(200, {
|
97 | 'Content-Type': 'text/event-stream;charset=UTF-8',
|
98 | 'Cache-Control': 'no-cache',
|
99 | Connection: 'keep-alive'
|
100 | });
|
101 |
|
102 | res.write(':ok\n\n');
|
103 |
|
104 |
|
105 | var retry = opts.retry | 0;
|
106 |
|
107 | if (retry) {
|
108 | res.write('retry: ' + retry + '\n');
|
109 | }
|
110 |
|
111 | flush(res);
|
112 |
|
113 | var subscription = src.filter(function (x) {
|
114 | return x;
|
115 | }).bufferWithTimeOrCount(burst.time, burst.count).filter(function (b) {
|
116 | return b.length;
|
117 | }).map(toOutput).observe(function (block) {
|
118 | debug && console.log('send to %s %s', req.url, block);
|
119 | res.write(block);
|
120 | });
|
121 |
|
122 | var removeClient = (0, _once2.default)(function () {
|
123 | debug && console.log('removing', req.url);
|
124 | subscription.unsubscribe();
|
125 | res.end();
|
126 | });
|
127 |
|
128 | req.on('end', removeClient);
|
129 | req.on('close', removeClient);
|
130 | res.on('finish', removeClient);
|
131 | };
|
132 |
|
133 | var getInitialValues = function getInitialValues(lastEventId) {
|
134 | if (!lastEventId) {
|
135 | return _kefir2.default.never();
|
136 | }
|
137 |
|
138 |
|
139 | var current = list[0];
|
140 | var oldestInCache = list[list.length - 1];
|
141 |
|
142 | debug && console.log('current', current);
|
143 | if (oldestInCache <= lastEventId + 1) {
|
144 | return _kefir2.default.constant((0, _range2.default)(lastEventId + 1, current + 1).map(function (id) {
|
145 | return cache[id];
|
146 | })).flatten();
|
147 | }
|
148 |
|
149 | debug && console.error('too old, getting more from redis');
|
150 | var fromCache$ = _kefir2.default.constant((0, _range2.default)(oldestInCache, current + 1).map(function (id) {
|
151 | return cache[id];
|
152 | }));
|
153 |
|
154 | var fromRedis$ = _kefir2.default.fromPromise((0, _query.query)(queryClient, namespc, lastEventId + 1, oldestInCache - 1));
|
155 |
|
156 | return _kefir2.default.concat([fromRedis$, fromCache$]).flatten();
|
157 | };
|
158 |
|
159 | var service = function () {
|
160 | var _ref2 = _asyncToGenerator(function* (req, res) {
|
161 | try {
|
162 | debug && console.log('connected', req.url);
|
163 | var lastEventId = Number(req.headers['last-event-id']);
|
164 |
|
165 | var initialValues = getInitialValues(lastEventId);
|
166 |
|
167 | debug && console.log('lastEventId', lastEventId);
|
168 |
|
169 |
|
170 |
|
171 |
|
172 | addClient(_kefir2.default.concat([initialValues, message$]), {}, {}, req, res);
|
173 | } catch (ex) {
|
174 | console.log(ex);
|
175 | throw ex;
|
176 | }
|
177 | });
|
178 |
|
179 | return function service(_x, _x2) {
|
180 | return _ref2.apply(this, arguments);
|
181 | };
|
182 | }();
|
183 |
|
184 | subClient.subscribe(namespc + '::events');
|
185 |
|
186 | return {
|
187 | service: service,
|
188 | unsubscribe: function unsubscribe() {
|
189 | console.log('unsubscribing from redis');
|
190 | subClient.unsubscribe(namespc + '::events');
|
191 | }
|
192 | };
|
193 | }; |
\ | No newline at end of file |