UNPKG

2.86 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.SseStream = void 0;
4const stream_1 = require("stream");
5function toDataString(data) {
6 if (typeof data === 'object') {
7 return toDataString(JSON.stringify(data));
8 }
9 return data
10 .split(/\r\n|\r|\n/)
11 .map(line => `data: ${line}\n`)
12 .join('');
13}
14/**
15 * Adapted from https://raw.githubusercontent.com/EventSource/node-ssestream
16 * Transforms "messages" to W3C event stream content.
17 * See https://html.spec.whatwg.org/multipage/server-sent-events.html
18 * A message is an object with one or more of the following properties:
19 * - data (String or object, which gets turned into JSON)
20 * - type
21 * - id
22 * - retry
23 *
24 * If constructed with a HTTP Request, it will optimise the socket for streaming.
25 * If this stream is piped to an HTTP Response, it will set appropriate headers.
26 */
27class SseStream extends stream_1.Transform {
28 constructor(req) {
29 super({ objectMode: true });
30 this.lastEventId = null;
31 if (req && req.socket) {
32 req.socket.setKeepAlive(true);
33 req.socket.setNoDelay(true);
34 req.socket.setTimeout(0);
35 }
36 }
37 pipe(destination, options) {
38 if (destination.writeHead) {
39 destination.writeHead(200, {
40 // See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
41 'Content-Type': 'text/event-stream',
42 Connection: 'keep-alive',
43 // Disable cache, even for old browsers and proxies
44 'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform',
45 Pragma: 'no-cache',
46 Expire: '0',
47 // NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
48 'X-Accel-Buffering': 'no',
49 });
50 destination.flushHeaders();
51 }
52 destination.write(':\n');
53 return super.pipe(destination, options);
54 }
55 _transform(message, encoding, callback) {
56 let data = message.type ? `event: ${message.type}\n` : '';
57 data += message.id ? `id: ${message.id}\n` : '';
58 data += message.retry ? `retry: ${message.retry}\n` : '';
59 data += message.data ? toDataString(message.data) : '';
60 data += '\n';
61 this.push(data);
62 callback();
63 }
64 /**
65 * Calls `.write` but handles the drain if needed
66 */
67 writeMessage(message, cb) {
68 if (!message.id) {
69 this.lastEventId++;
70 message.id = this.lastEventId.toString();
71 }
72 if (!this.write(message, 'utf-8', cb)) {
73 this.once('drain', cb);
74 }
75 else {
76 process.nextTick(cb);
77 }
78 }
79}
80exports.SseStream = SseStream;