UNPKG

2.47 kBPlain TextView Raw
1import { Transform } from 'stream'
2import { IncomingMessage, OutgoingHttpHeaders } from "http"
3
4function dataString(data: string|object): string {
5 if (typeof data === 'object') return dataString(JSON.stringify(data))
6 return data.split(/\r\n|\r|\n/).map(line => `data: ${line}\n`).join('')
7}
8
9interface Message {
10 data: string|object
11 comment?: string,
12 event?: string,
13 id?: string,
14 retry?: number,
15}
16
17interface WriteHeaders {
18 writeHead?(statusCode: number, headers?: OutgoingHttpHeaders): WriteHeaders
19 flushHeaders?(): void
20}
21
22export type HeaderStream = NodeJS.WritableStream & WriteHeaders
23
24/**
25 * Transforms "messages" to W3C event stream content.
26 * See https://html.spec.whatwg.org/multipage/server-sent-events.html
27 * A message is an object with one or more of the following properties:
28 * - data (String or object, which gets turned into JSON)
29 * - event
30 * - id
31 * - retry
32 * - comment
33 *
34 * If constructed with a HTTP Request, it will optimise the socket for streaming.
35 * If this stream is piped to an HTTP Response, it will set appropriate headers.
36 */
37export default class SseStream extends Transform {
38 constructor(req?: IncomingMessage) {
39 super({ objectMode: true })
40 if (req) {
41 req.socket.setKeepAlive(true)
42 req.socket.setNoDelay(true)
43 req.socket.setTimeout(0)
44 }
45 }
46
47 pipe<T extends HeaderStream>(destination: T, options?: { end?: boolean; }): T {
48 if (destination.writeHead) {
49 destination.writeHead(200, {
50 'Content-Type': 'text/event-stream; charset=utf-8',
51 'Transfer-Encoding': 'identity',
52 'Cache-Control': 'no-cache',
53 Connection: 'keep-alive',
54 })
55 destination.flushHeaders()
56 }
57 // Some clients (Safari) don't trigger onopen until the first frame is received.
58 destination.write(':ok\n\n')
59 return super.pipe(destination, options)
60 }
61
62 _transform(message: Message, encoding: string, callback: (error?: (Error | null), data?: any) => void) {
63 if (message.comment) this.push(`: ${message.comment}\n`)
64 if (message.event) this.push(`event: ${message.event}\n`)
65 if (message.id) this.push(`id: ${message.id}\n`)
66 if (message.retry) this.push(`retry: ${message.retry}\n`)
67 if (message.data) this.push(dataString(message.data))
68 this.push('\n')
69 callback()
70 }
71
72 writeMessage(message: Message, encoding?: string, cb?: (error: Error | null | undefined) => void): boolean {
73 return this.write(message, encoding, cb)
74 }
75}