1 | "use strict";
|
2 | Object.defineProperty(exports, "__esModule", { value: true });
|
3 | exports.SseStream = void 0;
|
4 | const shared_utils_1 = require("@nestjs/common/utils/shared.utils");
|
5 | const stream_1 = require("stream");
|
6 | function toDataString(data) {
|
7 | if ((0, shared_utils_1.isObject)(data)) {
|
8 | return toDataString(JSON.stringify(data));
|
9 | }
|
10 | return data
|
11 | .split(/\r\n|\r|\n/)
|
12 | .map(line => `data: ${line}\n`)
|
13 | .join('');
|
14 | }
|
15 |
|
16 |
|
17 |
|
18 |
|
19 |
|
20 |
|
21 |
|
22 |
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 | class SseStream extends stream_1.Transform {
|
29 | constructor(req) {
|
30 | super({ objectMode: true });
|
31 | this.lastEventId = null;
|
32 | if (req && req.socket) {
|
33 | req.socket.setKeepAlive(true);
|
34 | req.socket.setNoDelay(true);
|
35 | req.socket.setTimeout(0);
|
36 | }
|
37 | }
|
38 | pipe(destination, options) {
|
39 | if (destination.writeHead) {
|
40 | destination.writeHead(200, Object.assign(Object.assign({}, options === null || options === void 0 ? void 0 : options.additionalHeaders), {
|
41 |
|
42 | 'Content-Type': 'text/event-stream', Connection: 'keep-alive',
|
43 |
|
44 | 'Cache-Control': 'private, no-cache, no-store, must-revalidate, max-age=0, no-transform', Pragma: 'no-cache', Expire: '0',
|
45 |
|
46 | 'X-Accel-Buffering': 'no' }));
|
47 | destination.flushHeaders();
|
48 | }
|
49 | destination.write('\n');
|
50 | return super.pipe(destination, options);
|
51 | }
|
52 | _transform(message, encoding, callback) {
|
53 | let data = message.type ? `event: ${message.type}\n` : '';
|
54 | data += message.id ? `id: ${message.id}\n` : '';
|
55 | data += message.retry ? `retry: ${message.retry}\n` : '';
|
56 | data += message.data ? toDataString(message.data) : '';
|
57 | data += '\n';
|
58 | this.push(data);
|
59 | callback();
|
60 | }
|
61 | |
62 |
|
63 |
|
64 | writeMessage(message, cb) {
|
65 | if (!message.id) {
|
66 | this.lastEventId++;
|
67 | message.id = this.lastEventId.toString();
|
68 | }
|
69 | if (!this.write(message, 'utf-8', cb)) {
|
70 | this.once('drain', cb);
|
71 | }
|
72 | else {
|
73 | process.nextTick(cb);
|
74 | }
|
75 | }
|
76 | }
|
77 | exports.SseStream = SseStream;
|