UNPKG

2.99 kBJavaScriptView Raw
1"use strict";
2Object.defineProperty(exports, "__esModule", { value: true });
3exports.SseStream = void 0;
4const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
5const stream_1 = require("stream");
6function toDataString(data) {
7 if ((0, shared_utils_1.isObject)(data)) {
8 return toDataString(JSON.stringify(data));
9 }
10 return data
11 .split(/\r\n|\r|\n/)
12 .map(line => `data: ${line}\n`)
13 .join('');
14}
15/**
16 * Adapted from https://raw.githubusercontent.com/EventSource/node-ssestream
17 * Transforms "messages" to W3C event stream content.
18 * See https://html.spec.whatwg.org/multipage/server-sent-events.html
19 * A message is an object with one or more of the following properties:
20 * - data (String or object, which gets turned into JSON)
21 * - type
22 * - id
23 * - retry
24 *
25 * If constructed with a HTTP Request, it will optimise the socket for streaming.
26 * If this stream is piped to an HTTP Response, it will set appropriate headers.
27 */
28class SseStream extends stream_1.Transform {
29 constructor(req) {
30 super({ objectMode: true });
31 this.lastEventId = null;
32 if (req && req.socket) {
33 req.socket.setKeepAlive(true);
34 req.socket.setNoDelay(true);
35 req.socket.setTimeout(0);
36 }
37 }
38 pipe(destination, options) {
39 if (destination.writeHead) {
40 destination.writeHead(200, Object.assign(Object.assign({}, options === null || options === void 0 ? void 0 : options.additionalHeaders), {
41 // See https://github.com/dunglas/mercure/blob/master/hub/subscribe.go#L124-L130
42 'Content-Type': 'text/event-stream', Connection: 'keep-alive',
43 // Disable cache, even for old browsers and proxies
44 'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform', Pragma: 'no-cache', Expire: '0',
45 // NGINX support https://www.nginx.com/resources/wiki/start/topics/examples/x-accel/#x-accel-buffering
46 'X-Accel-Buffering': 'no' }));
47 destination.flushHeaders();
48 }
49 destination.write('\n');
50 return super.pipe(destination, options);
51 }
52 _transform(message, encoding, callback) {
53 let data = message.type ? `event: ${message.type}\n` : '';
54 data += message.id ? `id: ${message.id}\n` : '';
55 data += message.retry ? `retry: ${message.retry}\n` : '';
56 data += message.data ? toDataString(message.data) : '';
57 data += '\n';
58 this.push(data);
59 callback();
60 }
61 /**
62 * Calls `.write` but handles the drain if needed
63 */
64 writeMessage(message, cb) {
65 if (!message.id) {
66 this.lastEventId++;
67 message.id = this.lastEventId.toString();
68 }
69 if (!this.write(message, 'utf-8', cb)) {
70 this.once('drain', cb);
71 }
72 else {
73 process.nextTick(cb);
74 }
75 }
76}
77exports.SseStream = SseStream;