UNPKG

4.24 kBJavaScriptView Raw
1/* @flow */
2import kefir from 'kefir';
3import once from 'lodash/fp/once';
4import range from 'lodash/fp/range';
5
6import type { SubscribeConfig } from '../types/Config.type';
7import { createClient } from './redis-client';
8import { query } from './query';
9
10const tryParse = raw => {
11 try {
12 return JSON.parse(raw);
13 } catch (ex) {
14 return { type: '@@RAW', payload: raw };
15 }
16};
17
18function flush(response) {
19 if (response.flush && response.flush.name !== 'deprecated') {
20 response.flush();
21 }
22}
23
24const toOutput = events => `id: ${events[events.length - 1].id}
25event: INCMSG
26data: ${JSON.stringify(events)}
27
28`;
29
30export default ({ redis, history, debug, namespc, burst }: SubscribeConfig) => {
31 const subClient = createClient(redis);
32 const queryClient = createClient(redis);
33
34 // prepare cache
35 const MAX_SIZE = history.size;
36 const list = [];
37 const cache = {};
38
39 const addToCache = message => {
40 debug && console.log('adding to cache', message.id);
41
42 const newSize = list.unshift(message.id);
43 cache[message.id] = message;
44
45 if (newSize > MAX_SIZE) {
46 const expired = list.splice(MAX_SIZE);
47 expired.forEach(removingId => {
48 delete cache[removingId];
49 });
50 }
51
52 debug && console.log('HISTORY', list);
53 debug && console.log('CACHE\n', cache);
54 };
55
56 const message$ = kefir
57 .fromEvents(subClient, 'message', (channel, message) => message)
58 .map(message => {
59 const rawId = message.split(':')[0];
60 const id = Number(rawId);
61 const rawEvent = message.slice(rawId.length + 1);
62
63 return {
64 id,
65 ...tryParse(rawEvent),
66 };
67 })
68 .onValue(addToCache);
69
70 const addClient = (src, opts, match, req, res) => {
71 req.socket.setTimeout(0);
72 req.socket.setNoDelay(true);
73 req.socket.setKeepAlive(true);
74
75 res.writeHead(200, {
76 'Content-Type': 'text/event-stream;charset=UTF-8',
77 'Cache-Control': 'no-cache',
78 Connection: 'keep-alive',
79 });
80
81 res.write(':ok\n\n');
82
83 // cast to integer
84 const retry = opts.retry | 0;
85
86 if (retry) {
87 res.write('retry: ' + retry + '\n');
88 }
89
90 flush(res);
91
92 const subscription = src
93 .filter(x => x)
94 .bufferWithTimeOrCount(burst.time, burst.count)
95 .filter(b => b.length)
96 .map(toOutput)
97 .observe(block => {
98 debug && console.log('send to %s %s', req.url, block);
99 res.write(block);
100 });
101
102 const removeClient = once(() => {
103 debug && console.log('removing', req.url);
104 subscription.unsubscribe();
105 res.end();
106 });
107
108 req.on('end', removeClient);
109 req.on('close', removeClient);
110 res.on('finish', removeClient);
111 };
112
113 const getInitialValues = lastEventId => {
114 if (!lastEventId) {
115 return kefir.never();
116 }
117
118 // history[0] is the latest id
119 const current = list[0];
120 const oldestInCache = list[list.length - 1];
121
122 debug && console.log('current', current);
123 if (oldestInCache <= lastEventId + 1) {
124 return kefir
125 .constant(range(lastEventId + 1, current + 1).map(id => cache[id]))
126 .flatten();
127 }
128
129 debug && console.error('too old, getting more from redis');
130 const fromCache$ = kefir
131 .constant(range(oldestInCache, current + 1).map(id => cache[id]))
132
133 const fromRedis$ = kefir
134 .fromPromise(
135 query(queryClient, namespc, lastEventId + 1, oldestInCache - 1)
136 );
137
138 return kefir.concat([fromRedis$, fromCache$]).flatten();
139 };
140
141 const service = async (req: any, res: any) => {
142 try {
143 debug && console.log('connected', req.url);
144 const lastEventId = Number(req.headers['last-event-id']);
145
146 const initialValues = getInitialValues(lastEventId);
147
148 debug && console.log('lastEventId', lastEventId);
149
150 // const url = req.url;
151 // const query = parse(url.split('?')[1]);
152
153 addClient(
154 kefir.concat([initialValues, message$]),
155 {},
156 {},
157 req,
158 res
159 );
160 } catch (ex) {
161 console.log(ex);
162 throw ex;
163 }
164 };
165
166 subClient.subscribe(`${namespc}::events`);
167
168 return {
169 service,
170 unsubscribe: () => {
171 console.log('unsubscribing from redis');
172 subClient.unsubscribe(`${namespc}::events`);
173 },
174 };
175};