1 |
|
2 | import kefir from 'kefir';
|
3 | import once from 'lodash/fp/once';
|
4 | import range from 'lodash/fp/range';
|
5 |
|
6 | import type { SubscribeConfig } from '../types/Config.type';
|
7 | import { createClient } from './redis-client';
|
8 | import { query } from './query';
|
9 |
|
10 | const tryParse = raw => {
|
11 | try {
|
12 | return JSON.parse(raw);
|
13 | } catch (ex) {
|
14 | return { type: '@@RAW', payload: raw };
|
15 | }
|
16 | };
|
17 |
|
18 | function flush(response) {
|
19 | if (response.flush && response.flush.name !== 'deprecated') {
|
20 | response.flush();
|
21 | }
|
22 | }
|
23 |
|
24 | const toOutput = events => `id: ${events[events.length - 1].id}
|
25 | event: INCMSG
|
26 | data: ${JSON.stringify(events)}
|
27 |
|
28 | `;
|
29 |
|
30 | export default ({ redis, history, debug, namespc, burst }: SubscribeConfig) => {
|
31 | const subClient = createClient(redis);
|
32 | const queryClient = createClient(redis);
|
33 |
|
34 |
|
35 | const MAX_SIZE = history.size;
|
36 | const list = [];
|
37 | const cache = {};
|
38 |
|
39 | const addToCache = message => {
|
40 | debug && console.log('adding to cache', message.id);
|
41 |
|
42 | const newSize = list.unshift(message.id);
|
43 | cache[message.id] = message;
|
44 |
|
45 | if (newSize > MAX_SIZE) {
|
46 | const expired = list.splice(MAX_SIZE);
|
47 | expired.forEach(removingId => {
|
48 | delete cache[removingId];
|
49 | });
|
50 | }
|
51 |
|
52 | debug && console.log('HISTORY', list);
|
53 | debug && console.log('CACHE\n', cache);
|
54 | };
|
55 |
|
56 | const message$ = kefir
|
57 | .fromEvents(subClient, 'message', (channel, message) => message)
|
58 | .map(message => {
|
59 | const rawId = message.split(':')[0];
|
60 | const id = Number(rawId);
|
61 | const rawEvent = message.slice(rawId.length + 1);
|
62 |
|
63 | return {
|
64 | id,
|
65 | ...tryParse(rawEvent),
|
66 | };
|
67 | })
|
68 | .onValue(addToCache);
|
69 |
|
70 | const addClient = (src, opts, match, req, res) => {
|
71 | req.socket.setTimeout(0);
|
72 | req.socket.setNoDelay(true);
|
73 | req.socket.setKeepAlive(true);
|
74 |
|
75 | res.writeHead(200, {
|
76 | 'Content-Type': 'text/event-stream;charset=UTF-8',
|
77 | 'Cache-Control': 'no-cache',
|
78 | Connection: 'keep-alive',
|
79 | });
|
80 |
|
81 | res.write(':ok\n\n');
|
82 |
|
83 |
|
84 | const retry = opts.retry | 0;
|
85 |
|
86 | if (retry) {
|
87 | res.write('retry: ' + retry + '\n');
|
88 | }
|
89 |
|
90 | flush(res);
|
91 |
|
92 | const subscription = src
|
93 | .filter(x => x)
|
94 | .bufferWithTimeOrCount(burst.time, burst.count)
|
95 | .filter(b => b.length)
|
96 | .map(toOutput)
|
97 | .observe(block => {
|
98 | debug && console.log('send to %s %s', req.url, block);
|
99 | res.write(block);
|
100 | });
|
101 |
|
102 | const removeClient = once(() => {
|
103 | debug && console.log('removing', req.url);
|
104 | subscription.unsubscribe();
|
105 | res.end();
|
106 | });
|
107 |
|
108 | req.on('end', removeClient);
|
109 | req.on('close', removeClient);
|
110 | res.on('finish', removeClient);
|
111 | };
|
112 |
|
113 | const getInitialValues = lastEventId => {
|
114 | if (!lastEventId) {
|
115 | return kefir.never();
|
116 | }
|
117 |
|
118 |
|
119 | const current = list[0];
|
120 | const oldestInCache = list[list.length - 1];
|
121 |
|
122 | debug && console.log('current', current);
|
123 | if (oldestInCache <= lastEventId + 1) {
|
124 | return kefir
|
125 | .constant(range(lastEventId + 1, current + 1).map(id => cache[id]))
|
126 | .flatten();
|
127 | }
|
128 |
|
129 | debug && console.error('too old, getting more from redis');
|
130 | const fromCache$ = kefir
|
131 | .constant(range(oldestInCache, current + 1).map(id => cache[id]))
|
132 |
|
133 | const fromRedis$ = kefir
|
134 | .fromPromise(
|
135 | query(queryClient, namespc, lastEventId + 1, oldestInCache - 1)
|
136 | );
|
137 |
|
138 | return kefir.concat([fromRedis$, fromCache$]).flatten();
|
139 | };
|
140 |
|
141 | const service = async (req: any, res: any) => {
|
142 | try {
|
143 | debug && console.log('connected', req.url);
|
144 | const lastEventId = Number(req.headers['last-event-id']);
|
145 |
|
146 | const initialValues = getInitialValues(lastEventId);
|
147 |
|
148 | debug && console.log('lastEventId', lastEventId);
|
149 |
|
150 |
|
151 |
|
152 |
|
153 | addClient(
|
154 | kefir.concat([initialValues, message$]),
|
155 | {},
|
156 | {},
|
157 | req,
|
158 | res
|
159 | );
|
160 | } catch (ex) {
|
161 | console.log(ex);
|
162 | throw ex;
|
163 | }
|
164 | };
|
165 |
|
166 | subClient.subscribe(`${namespc}::events`);
|
167 |
|
168 | return {
|
169 | service,
|
170 | unsubscribe: () => {
|
171 | console.log('unsubscribing from redis');
|
172 | subClient.unsubscribe(`${namespc}::events`);
|
173 | },
|
174 | };
|
175 | };
|