1 | /**
|
2 | * Caterpillar supports piping to anything that supports this interface.
|
3 | * Which includes:
|
4 | * - {@link Transform Caterpillar Transforms}
|
5 | * - [Deno Writer Streams](https://doc.deno.land/https/github.com/denoland/deno/releases/latest/download/lib.deno.d.ts#Deno.Writer), e.g.
|
6 | * - [Deno.stdout](https://doc.deno.land/https/github.com/denoland/deno/releases/latest/download/lib.deno.d.ts#Deno.stdout)
|
7 | * - [Node.js Writable Streams](https://nodejs.org/dist/latest-v14.x/docs/api/stream.html#stream_writable_streams), e.g.
|
8 | * - [process.stdout](https://nodejs.org/dist/latest-v14.x/docs/api/process.html#process_process_stdout)
|
9 | * - [fs.createWriteStream](https://nodejs.org/dist/latest-v14.x/docs/api/fs.html#fs_fs_createwritestream_path_options)
|
10 | * - [WhatWG Writable Streams](https://developer.mozilla.org/en-US/docs/Web/API/WritableStream)
|
11 | */
|
12 | export interface Pipeable {
|
13 | write(chunk: any): any
|
14 | end?(cb?: () => void): void
|
15 | close?(): Promise<void> | void
|
16 | }
|
17 |
|
18 | /**
|
19 | * Caterpillar Transform Class.
|
20 | * Provides the methods needed to provide a pipable Caterpillar Transform.
|
21 | * Such that all you need to do is write your {@link Transform.format} method.
|
22 | * It can pipe to anything that provides a {@link Pipeable.write} method.
|
23 | * @example [Writing a Custom Transform](https://repl.it/@balupton/caterpillar-custom-transform)
|
24 | */
|
25 | export class Transform implements Pipeable {
|
26 | /** Where is this Transform piping to? */
|
27 | private pipes: Array<Pipeable> = []
|
28 |
|
29 | /**
|
30 | * Format the received log entry representation.
|
31 | * Your transformer should extend this.
|
32 | */
|
33 | format(message: any): any {
|
34 | return message
|
35 | }
|
36 |
|
37 | /** Pipe future log entries into a caterpillar transform or a stream. */
|
38 | pipe<T extends Pipeable>(to: T) {
|
39 | this.pipes.push(to)
|
40 | return to
|
41 | }
|
42 |
|
43 | /** Maintain a write queue such that multiple Deno writes do not stall */
|
44 | private writer = Promise.resolve()
|
45 |
|
46 | /** Write to the child pipes. */
|
47 | write(chunk: any) {
|
48 | // format now, so that we have the correct stack
|
49 | const data = this.format(chunk)
|
50 | // exclude filtered entries
|
51 | if (data == null) return this.writer
|
52 | // now delegate back to the pipe
|
53 | this.writer = this.writer.then(async () => {
|
54 | // pipe to child transforms and streams
|
55 | for (const pipe of this.pipes) {
|
56 | if (pipe instanceof Transform) {
|
57 | // compatibility with caterpillar transforms
|
58 | await pipe.write(data)
|
59 | } else {
|
60 | const str = typeof data === 'string' ? data : JSON.stringify(data)
|
61 | // requires typescript dom lib to define TextEncoder global
|
62 | if (typeof TextEncoder !== 'undefined') {
|
63 | // compatibility with deno and later node streams
|
64 | await pipe.write(new TextEncoder().encode(str))
|
65 | } else {
|
66 | // compatibility with earlier node streams
|
67 | await pipe.write(str)
|
68 | }
|
69 | }
|
70 | }
|
71 | })
|
72 | return this.writer
|
73 | }
|
74 |
|
75 | /** Close the child pipes. */
|
76 | async close(): Promise<void> {
|
77 | await Promise.all(
|
78 | this.pipes.map((pipe) => {
|
79 | if (pipe.close) {
|
80 | return pipe.close()
|
81 | } else if (pipe.end) {
|
82 | return new Promise<void>(function (resolve) {
|
83 | pipe.end!(resolve)
|
84 | })
|
85 | } else {
|
86 | return Promise.resolve()
|
87 | }
|
88 | })
|
89 | )
|
90 | }
|
91 |
|
92 | /* Callback alias for close */
|
93 | end(cb?: () => void) {
|
94 | const p = this.close()
|
95 | if (cb) p.finally(cb)
|
96 | }
|
97 | }
|
98 |
|
\ | No newline at end of file |