1 | import { Transform } from 'stream'
|
2 | import { IncomingMessage, OutgoingHttpHeaders } from "http"
|
3 |
|
4 | function dataString(data: string|object): string {
|
5 | if (typeof data === 'object') return dataString(JSON.stringify(data))
|
6 | return data.split(/\r\n|\r|\n/).map(line => `data: ${line}\n`).join('')
|
7 | }
|
8 |
|
9 | interface Message {
|
10 | data: string|object
|
11 | comment?: string,
|
12 | event?: string,
|
13 | id?: string,
|
14 | retry?: number,
|
15 | }
|
16 |
|
17 | interface WriteHeaders {
|
18 | writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders
|
19 | flushHeaders?(): void
|
20 | }
|
21 |
|
22 | export type HeaderStream = NodeJS.WritableStream & WriteHeaders
|
23 |
|
24 |
|
25 |
|
26 |
|
27 |
|
28 |
|
29 |
|
30 |
|
31 |
|
32 |
|
33 |
|
34 |
|
35 |
|
36 |
|
37 | export default class SseStream extends Transform {
|
38 | constructor(req?: IncomingMessage) {
|
39 | super({ objectMode: true })
|
40 | if (req) {
|
41 | req.socket.setKeepAlive(true)
|
42 | req.socket.setNoDelay(true)
|
43 | req.socket.setTimeout(0)
|
44 | }
|
45 | }
|
46 |
|
47 | pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean; }): T {
|
48 | if (destination.writeHead) {
|
49 | destination.writeHead(200, {
|
50 | 'Content-Type': 'text/event-stream; charset=utf-8',
|
51 | 'Transfer-Encoding': 'identity',
|
52 | 'Cache-Control': 'no-cache',
|
53 | Connection: 'keep-alive',
|
54 | })
|
55 | destination.flushHeaders()
|
56 | }
|
57 |
|
58 | destination.write(':ok\n\n')
|
59 | return super.pipe(destination, options)
|
60 | }
|
61 |
|
62 | _transform(message: Message, encoding: string, callback: (error?: (Error | null), data?: any) => void) {
|
63 | if (message.comment) this.push(`: ${message.comment}\n`)
|
64 | if (message.event) this.push(`event: ${message.event}\n`)
|
65 | if (message.id) this.push(`id: ${message.id}\n`)
|
66 | if (message.retry) this.push(`retry: ${message.retry}\n`)
|
67 | if (message.data) this.push(dataString(message.data))
|
68 | this.push('\n')
|
69 | callback()
|
70 | }
|
71 |
|
72 | writeMessage(message: Message, encoding?: string, cb?: (error: Error | null | undefined) => void): boolean {
|
73 | return this.write(message, encoding, cb)
|
74 | }
|
75 | }
|